Перейти к основному содержимому
Версия: 1.7.5

Использование SDK

Минимальный пример

from module_sdk.app.app import AppModule
from module_sdk.models.collection_host import CollectionHost
from module_sdk.storage.storage import scan_status_storage


def collect_data():
host = CollectionHost(
name="server-01.example.com",
responsible="00000000-0000-0000-0000-000000000000",
display_name="Сервер приложений #1",
operation_system="Ubuntu 22.04 LTS"
)
scan_status_storage.append_host_in_storage(host)


if __name__ == "__main__":
AppModule.app_create_and_do_py(collect_data)

Полный пример с сетевыми интерфейсами и пакетами

from module_sdk.app.app import AppModule
from module_sdk.models.collection_host import CollectionHost
from module_sdk.models.net_interfaces import NetInterfaces
from module_sdk.models.package import Package
from module_sdk.models.network_service import NetworkService
from module_sdk.storage.storage import scan_status_storage


def collect_data():
iface = NetInterfaces(
name="eth0",
ipv4="192.168.1.10",
mac_addr="00:1A:2B:3C:4D:5E",
fqdn="server-01.example.com"
)

pkg = Package(
name="nginx",
version="1.24.0",
architecture="x86_64",
vendor="Nginx Inc."
)

svc = NetworkService(
name="https",
port=443,
protocol="tcp",
state="open",
version="nginx/1.24.0"
)

host = CollectionHost(
name="server-01.example.com",
responsible="00000000-0000-0000-0000-000000000000",
display_name="Web-сервер",
operation_system="Ubuntu 22.04 LTS",
net_interfaces=[iface],
packages=[pkg],
network_services=[svc],
metadata={
"source": "my-connector",
"imported_at": "2025-01-15T10:00:00Z"
}
)

scan_status_storage.append_host_in_storage(host)


if __name__ == "__main__":
AppModule.app_create_and_do_py(collect_data)

Что происходит внутри app_create_and_do_py

  1. Настраивается логирование (уровень через LOG_LEVEL env, по умолчанию INFO)

  2. В отдельном потоке запускается HTTP-сервер (waitress + connexion) на порту из конфига

  3. Статус переводится в RUN

  4. Вызывается ваша функция

  5. Если в storage есть ассеты -- они упаковываются в пакет

  6. Если есть пакеты для доставки -- статус переходит в ASSETS_READY, иначе -- FINISH

  7. При исключении -- статус переходит в ERROR

Чанкинг данных

SDK автоматически разбивает данные на пакеты:

  • Параметр max_chunk_size (по умолчанию 500) задает порог

  • При добавлении ассета через append_*_in_storage() SDK проверяет общее количество

  • Когда суммарное количество ассетов (host + account + network + external_address + global_metadata) >= max_chunk_size, текущий набор сериализуется в JSON-файл и добавляется в очередь пакетов

  • Keeper забирает пакеты по одному через GET /scan/status и подтверждает через POST /scan/got-packet

Добавление разных типов ассетов

from module_sdk.storage.storage import scan_status_storage
from module_sdk.models.collection_host import CollectionHost
from module_sdk.models.collection_account import CollectionAccount
from module_sdk.models.collection_network import CollectionNetwork
from module_sdk.models.collection_external_address import CollectionExternalAddress

# Хосты
scan_status_storage.append_host_in_storage(CollectionHost(...))

# Аккаунты
scan_status_storage.append_account_in_storage(CollectionAccount(...))

# Сети
scan_status_storage.append_network_in_storage(CollectionNetwork(...))

# Внешние адреса (по одному)
scan_status_storage.append_external_address_in_storage(CollectionExternalAddress(...))

# Внешние адреса (списком)
scan_status_storage.extend_external_address_in_storage([
CollectionExternalAddress(...),
CollectionExternalAddress(...)
])

Отправка статусов выполнения

Модуль может отправлять статусы в реальном времени. Они отображаются в UI в карточке выполнения автоправила и помогают отслеживать прогресс и диагностировать ошибки.

from module_sdk import status_sender, StatusType

def collect_data():
status_sender.send_status(StatusType.INFO, "Запуск сканирования")

try:
status_sender.send_status(StatusType.INFO, "Подключение к серверу")
connection = connect(host, port)
status_sender.send_status(StatusType.INFO, "Подключение установлено")

results = scan(connection)
status_sender.send_status(
StatusType.INFO,
f"Сканирование завершено. Найдено {len(results)} объектов",
data={"count": len(results)}
)

except AuthenticationError as e:
status_sender.send_status(StatusType.ERROR, str(e), code="CREDENTIALS_ERROR")
raise
except ConnectionError as e:
status_sender.send_status(StatusType.ERROR, str(e), code="CONNECTION_ERROR")
raise
except Exception as e:
status_sender.send_status(StatusType.ERROR, str(e), code="MODULE_ERROR")
raise

Рекомендуемые коды ошибок:

КодОписание
CREDENTIALS_ERRORОшибка аутентификации / некорректные учетные данные
CONNECTION_ERRORОшибка сетевого подключения
TIMEOUTПревышено время ожидания
EXECUTION_ERRORОшибка выполнения команды
MODULE_ERRORПрочие ошибки модуля

Подробнее -- см. Справочник API: Статусы выполнения.